package com.newegg.hardware.benchmark.kafka;

import java.util.Date;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.newegg.hardware.benchmark.elastic.ElasticClient;
import com.newegg.hardware.benchmark.elastic.ElasticRequest;

@Component
public class BenchmarkConsumer {
	static Logger logger = Logger.getLogger(BenchmarkConsumer.class);
	
	@Value("${spring.elastic.url}")
	private String hosts;
	
	@Value("${spring.elastic.index}")
	private String index;
	
	ElasticClient client;
	
	@PostConstruct
	public void init() {
		client = new ElasticClient(hosts);
	}
	
	@KafkaListener(topics="${spring.kafka.consumer.topic}")
    public void listen (ConsumerRecord<String, Map<String, Object>> record) throws Exception {
		String key = record.key();
		Map<String, Object> data = record.value();
		data.put("@timestamp", new Date());
		client.insert(new ElasticRequest(index, key, data));
    }
}
